package com.chencong.online.driver.ods;

import com.chencong.online.env.FlinkEnv;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author chencong
 * @Description
 * @Date 3:04 下午 2022/3/15
 * @Param
 **/
public class OdsFlinkSqlCdcDriver {
    public static void main(String[] args) {
        StreamTableEnvironment tableEnv = FlinkEnv.FlinkSqlRunEnv();

        //离线标签
        tableEnv.executeSql(
                "CREATE TABLE `user_offline_labels` (\n" +
                        "  `user_id` bigint,\n" +
                        "  `be_show_cnt_24h` bigint,\n" +
                        "  `be_chat_cnt_24h` bigint ,\n" +
                        "  `be_chat_received_cnt_24h` bigint ,\n" +
                        "  `be_chat_cnt_7x24h` bigint,\n" +
                        "  `be_chat_received_cnt_7x24h` bigint,\n" +
                        "  `post_moment_num_30d` bigint,\n" +
                        "  `activity_day_num` bigint,\n" +
                        "  `chat_cnt_30d` bigint,\n" +
                        "  `send_num_48h` bigint,\n" +
                        "  `send_num_30d` bigint,\n" +
                        "  `is_send0_received0_user_7d` tinyint,\n" +
                        "  `is_send_received0_user_7d` tinyint,\n" +
                        "  `be_visit_profile_cnt_24h` bigint,\n" +
                        "  `chat_more_5_cnt_24h` bigint,\n" +
                        "  `is_click_send0_24h` tinyint,\n" +
                        "  `user_fans_num` bigint,\n" +
                        "  `user_post_moment_num` bigint,\n" +
                        "  `user_liked_num` bigint,\n" +
                        "  PRIMARY KEY (`user_id`) NOT ENFORCED\n" +
                        ") WITH (\n" +
                        "  'connector' = 'mysql-cdc',  \n" +
                        "  'hostname' = '10.10.2.58',   \n" +
                        "  'port' = '3306',             \n" +
                        "  'username' = 'dw_rw_lang_partner',      \n" +
                        "  'password' = 'zu1lf5qB7LIkzP+',    \n" +
                        "  'database-name' = 'language_partner', \n" +
                        "  'table-name' = 'user_offline_labels',\n" +
                        "  'server-time-zone'='Asia/Shanghai'\n" +
                        "  )"
        );

        tableEnv.executeSql("select * from user_offline_labels");

    }
}
